/*
 * Copyright (c) 2024, Alibaba Cloud;
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.aliyun.dataworks.migrationx.transformer.core.spark.command;

import java.io.IOException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.List;
import java.util.logging.Logger;

/**
 * In-process launcher for Spark applications.
 * <p>
 * Use this class to start Spark applications programmatically. Applications launched using this class will run in the
 * same process as the caller.
 * <p>
 * Because Spark only supports a single active instance of <code>SparkContext</code> per JVM, code that uses this class
 * should be careful about which applications are launched. It's recommended that this launcher only be used to launch
 * applications in cluster mode.
 * <p>
 * Also note that, when running applications in client mode, JVM-related configurations (like driver memory or configs
 * which modify the driver's class path) do not take effect. Logging configuration is also inherited from the parent
 * application.
 *
 * @since Spark 2.3.0
 */
public class InProcessLauncher extends AbstractLauncher<InProcessLauncher> {

    private static final Logger LOG = Logger.getLogger(InProcessLauncher.class.getName());

    /**
     * Starts a Spark application.
     *
     * @param listeners Listeners to add to the handle before the app is launched.
     * @return A handle for the launched application.
     * @see AbstractLauncher#startApplication(SparkAppHandle.Listener...)
     */
    @Override
    public SparkAppHandle startApplication(SparkAppHandle.Listener... listeners) throws IOException {
        if (builder.isClientMode(builder.getEffectiveConfig())) {
            LOG.warning("It's not recommended to run client-mode applications using InProcessLauncher.");
        }

        Method main = findSparkSubmit();
        LauncherServer server = LauncherServer.getOrCreateServer();
        InProcessAppHandle handle = new InProcessAppHandle(server);
        for (SparkAppHandle.Listener l : listeners) {
            handle.addListener(l);
        }

        String secret = server.registerHandle(handle);
        setConf(LauncherProtocol.CONF_LAUNCHER_PORT, String.valueOf(server.getPort()));
        setConf(LauncherProtocol.CONF_LAUNCHER_SECRET, secret);

        List<String> sparkArgs = builder.buildSparkSubmitArgs();
        String[] argv = sparkArgs.toArray(new String[sparkArgs.size()]);

        String appName = CommandBuilderUtils.firstNonEmpty(builder.appName, builder.mainClass,
            "<unknown>");
        handle.start(appName, main, argv);
        return handle;
    }

    @Override
    InProcessLauncher self() {
        return this;
    }

    // Visible for testing.
    Method findSparkSubmit() throws IOException {
        ClassLoader cl = Thread.currentThread().getContextClassLoader();
        if (cl == null) {
            cl = getClass().getClassLoader();
        }

        Class<?> sparkSubmit;
        // SPARK-22941: first try the new SparkSubmit interface that has better error handling,
        // but fall back to the old interface in case someone is mixing & matching launcher and
        // Spark versions.
        try {
            sparkSubmit = cl.loadClass("org.apache.spark.deploy.InProcessSparkSubmit");
        } catch (Exception e1) {
            try {
                sparkSubmit = cl.loadClass("org.apache.spark.deploy.SparkSubmit");
            } catch (Exception e2) {
                throw new IOException("Cannot find SparkSubmit; make sure necessary jars are available.",
                    e2);
            }
        }

        Method main;
        try {
            main = sparkSubmit.getMethod("main", String[].class);
        } catch (Exception e) {
            throw new IOException("Cannot find SparkSubmit main method.", e);
        }

        CommandBuilderUtils.checkState(Modifier.isStatic(main.getModifiers()),
            "main method is not static.");
        return main;
    }

}
